Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: add broadcast::Sender<T>::closed #6685

Merged
merged 1 commit into from
Jan 9, 2025

Conversation

evanrittenhouse
Copy link
Contributor

@evanrittenhouse evanrittenhouse commented Jul 14, 2024

Motivation

Adds a closed Future to broadcast::Sender<T>, similar to the oneshot or mpsc variants, which completes when all subscribed receivers have been dropped.

Solution

This is a simple poll_fn which wraps a check around shared.tail.rx_cnt, returning Ready if the remaining count is 0.

Closes: #6649

@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Jul 14, 2024
tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Jul 14, 2024
@Darksonn
Copy link
Contributor

What's the status on this?

@evanrittenhouse
Copy link
Contributor Author

Work has been nuts - haven't had much time for anything else. This is on my backlog though

@Darksonn
Copy link
Contributor

No rush from my side. Just wanted to check in.

@evanrittenhouse
Copy link
Contributor Author

evanrittenhouse commented Aug 19, 2024

Pushed up what should be closer to the proper implementation. I'd like to add a task.is_woken() check after each rx is dropped in the integration test, but for some reason, dropping rx2 doesn't wake the task (though it does make it Ready).

I'll dig more into that when I have a bit more time, as well as fix the various CI issues and clean up the code (for example the tail lock will drop when it goes out of scope).

tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
Comment on lines 849 to 861
self.shared.notify_rx_drop.notified().await;

poll_fn(|_| {
let tail = self.shared.tail.lock();

if tail.closed || tail.rx_cnt == 0 {
return Poll::Ready(());
}

drop(tail);
return Poll::Pending;
})
.await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The poll fn needs to interact with the notified() future for this to work. Check out this for inspiration:

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();
loop {
if this.cancellation_token.is_cancelled() {
return Poll::Ready(());
}
// No wakeups can be lost here because there is always a call to
// `is_cancelled` between the creation of the future and the call to
// `poll`, and the code that sets the cancelled flag does so before
// waking the `Notified`.
if this.future.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}
this.future.set(this.cancellation_token.inner.notified());
}

You should be able to do something similar without creating a whole future struct if you use tokio::pin! on the notified() future.

Copy link
Contributor Author

@evanrittenhouse evanrittenhouse Aug 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we now notify only once, when the last receiver drops, can we just await the Notified directly? I originally had this block because I notified on every Rx drop, so had to include an additional check to ensure that we had just dropped the last Rx.

I'm also trying to get a better understanding of the internals here. For my own understanding - this doesn't work because, after the first call, the internal Future state machine is progressed to the point where we're already in the poll_fn and have a Pending from a non-terminal Rx dropping. That means that we'd never await the notified() again. Is that correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Notified doesn't have spurious wakeups in the scenario we are using it, so it should work. Well, except for the fact that we probably want it to return immediately if it's already closed when we call it. We can do it like this:

loop {
    let notified = self.shared.notify_rx_drop.notified();
    if self.is_closed() { return; }
    notified.await;
}

Copy link
Contributor Author

@evanrittenhouse evanrittenhouse Sep 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done. I used tail.closed instead of self.is_closed() as I couldn't find an is_closed on Sender, which means an extra lock.

For my own curiosity, what's the advantage of the loop above vs. a simple self.shared.notify_rx_drop.notified().await, if Notified doesn't suffer from spurious wakeups? Now that we only notify once when the last receiver drops, it may be a nice way to avoid that extra lock I mentioned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't use a loop, then it changes the behavior of the test I requested in #6685 (comment). Which behavior we want is up for discussion.

tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
@evanrittenhouse evanrittenhouse marked this pull request as draft August 19, 2024 11:31
@evanrittenhouse evanrittenhouse force-pushed the evanrittenhouse/6649 branch 3 times, most recently from eb0d891 to 43be386 Compare September 22, 2024 15:16
@evanrittenhouse evanrittenhouse marked this pull request as ready for review September 22, 2024 15:16
@evanrittenhouse evanrittenhouse force-pushed the evanrittenhouse/6649 branch 2 times, most recently from 2ebc20b to e170736 Compare November 3, 2024 01:20
tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
@evanrittenhouse evanrittenhouse force-pushed the evanrittenhouse/6649 branch 4 times, most recently from a4980b3 to ddb1df2 Compare December 27, 2024 02:49
@evanrittenhouse
Copy link
Contributor Author

Last failure seems to just be docs-related, so I think we just have to kick the job

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@Darksonn Darksonn changed the title sync: add Sender<T>::closed future sync: add broadcast::Sender<T>::closed Jan 9, 2025
@Darksonn Darksonn merged commit 5c8cd33 into tokio-rs:master Jan 9, 2025
81 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add broadcast::Sender::closed Future
4 participants